\documentclass[11pt]{article}
\usepackage{geometry}
\geometry{letterpaper}


\begin{document}

\title{Hadoop Fair Scheduler Design Document}
\author{}
\maketitle
\tableofcontents

\section{Introduction}

The Hadoop Fair Scheduler started as a simple means to share MapReduce clusters. Over time, it has grown in functionality to support hierarchical scheduling, preemption, and multiple ways of organizing and weighing jobs. This document explains the goals and features of the Fair Scheduler and its internal design.

\section{Fair Scheduler Goals}

The Fair Scheduler was designed with four main goals:
\begin{enumerate}
  \item Run small jobs quickly even if they are sharing a cluster with large jobs. Unlike Hadoop's built-in FIFO scheduler, fair scheduling lets small jobs make progress even if a large job is running, without starving the large job.
  \item Provide guaranteed service levels to ``production" jobs, to let them run alongside experimental jobs in a shared cluster.
  \item Be simple to administer and configure. The scheduler should do something reasonable ``out of the box," and users should only need to configure it as they discover that they want to use more advanced features.
  \item Support reconfiguration at runtime, without requiring a cluster restart.
\end{enumerate}

\section{Scheduler Features}

This section provides a quick overview of the features of the Fair Scheduler. A detailed usage guide is available in the Hadoop documentation in {\tt build/docs/fair\_scheduler.html}.

\subsection{Pools}

The Fair Scheduler groups jobs into ``pools" and performs fair sharing between these pools. Each pool can use either FIFO or fair sharing to schedule jobs internal to the pool. The pool that a job is placed in is determined by a JobConf property, the ``pool name property". By default, this is {\tt user.name}, so that there is one pool per user. However, different properties can be used, e.g.~{\tt group.name} to have one pool per Unix group.

A common trick is to set the pool name property to an unused property name such as {\tt pool.name} and make this default to {\tt user.name}, so that there is one pool per user but it is also possible to place jobs into ``special" pools by setting their {\tt pool.name} directly. The {\tt mapred-site.xml} snippet below shows how to do this:

\begin{verbatim}
<property>
  <name>mapred.fairscheduler.poolnameproperty</name>
  <value>pool.name</value>
</property>

<property>
  <name>pool.name</name>
  <value>${user.name}</value>
</property>
\end{verbatim}

\subsection{Minimum Shares}

Normally, active pools (those that contain jobs) will get equal shares of the map and reduce task slots in the cluster. However, it is also possible to set a \emph{minimum share} of map and reduce slots on a given pool, which is a number of slots that it will always get when it is active, even if its fair share would be below this number. This is useful for guaranteeing that production jobs get a certain desired level of service when sharing a cluster with non-production jobs. Minimum shares have three effects:
\begin{enumerate}
  \item The pool's fair share will always be at least as large as its minimum share. Slots are taken from the share of other pools to achieve this. The only exception is if the minimum shares of the active pools add up to more than the total number of slots in the cluster; in this case, each pool's share will be scaled down proportionally.
  \item Pools whose running task count is below their minimum share get assigned slots first when slots are available.
  \item It is possible to set a \emph{preemption timeout} on the pool after which, if it has not received enough task slots to meet its minimum share, it is allowed to kill tasks in other jobs to meet its share. Minimum shares with preemption timeouts thus act like SLAs.
\end{enumerate}

Note that when a pool is inactive (contains no jobs), its minimum share is not ``reserved" for it -- the slots are split up among the other pools.

\subsection{Preemption}

As explained above, the scheduler may kill tasks from a job in one pool in order to meet the minimum share of another pool. We call this preemption, although this usage of the word is somewhat strange given the normal definition of preemption as pausing; really it is the \emph{job} that gets preempted, while the task gets killed. The feature explained above is called \emph{min share preemption}. In addition, the scheduler supports \emph{fair share preemption}, to kill tasks when a pool's fair share is not being met. Fair share preemption is much more conservative than min share preemption, because pools without min shares are expected to be non-production jobs where some amount of unfairness is tolerable. In particular, fair share preemption activates if a pool has been below \emph{half} of its fair share for a configurable fair share preemption timeout, which is recommended to be set fairly high (e.g. 10 minutes).

In both types of preemption, the scheduler kills the most recently launched tasks from over-scheduled pools, to minimize the amount of computation wasted by preemption.

\subsection{Running Job Limits}

The fair scheduler can limit the number of concurrently running jobs from each user and from each pool. This is useful for limiting the amount of intermediate data generated on the cluster. The jobs that will run are chosen in order of submit time and priority. Jobs submitted beyond the limit wait for one of the running jobs to finish.

\subsection{Job Priorities}

Within a pool, job priorities can be used to control the scheduling of jobs, whether the pool's internal scheduling mode is FIFO or fair sharing:
\begin{itemize}
  \item In FIFO pools, jobs are ordered first by priority and then by submit time, as in Hadoop's default scheduler.
  \item In fair sharing pools, job priorities are used as weights to control how much share a job gets. The normal priority corresponds to a weight of 1.0, and each level gives 2x more weight. For example, a high-priority job gets a weight of 2.0, and will therefore get 2x the share of a normal-priority job. 
\end{itemize}

\subsection{Pool Weights}

Pools can be given weights to achieve unequal sharing of the cluster. For example, a pool with weight 2.0 gets 2x the share of a pool with weight 1.0.

\subsection{Delay Scheduling}

The Fair Scheduler contains an algorithm called delay scheduling to improve data locality. Jobs that cannot launch a data-local map task wait for some period of time before they are allowed to launch non-data-local tasks, ensuring that they will run locally if some node in the cluster has the relevant data. Delay scheduling is described in detail in Section \ref{sec:delay-scheduling}.

\subsection{Administration}

The Fair Scheduler includes a web UI displaying the active pools and jobs and their fair shares, moving jobs between pools, and changing job priorities.
In addition, the Fair Scheduler's allocation file (specifying min shares and preemption timeouts for the pools) is automatically reloaded if it is modified on disk, to allow runtime reconfiguration.

\section{Implementation}

\subsection{Hadoop Scheduling Background}

Hadoop jobs consist of a number of map and reduce \emph{tasks}. These task run in \emph{slots} on the nodes on the cluster. Each node is configured with a number of map slots and reduce slots based on its computational resources (typically one slot per core). The role of the scheduler is to assign tasks to any slots that are free.

All schedulers in Hadoop, including the Fair Scheduler, inherit from the {\tt TaskScheduler} abstract class. This class provides access to a {\tt TaskTrackerManager} -- an interface to the JobTracker -- as well as a {\tt Configuration} instance. It also ask the scheduler to implement three abstract methods: the lifecycle methods {\tt start} and {\tt terminate}, and a method called {\tt assignTasks} to launch tasks on a given TaskTracker.
Task assignment in Hadoop is reactive. TaskTrackers periodically send heartbeats to the JobTracker with their {\tt TaskTrackerStatus}, which contains a list of running tasks, the number of slots on the node, and other information. The JobTracker then calls {\tt assignTasks} on the scheduler to obtain tasks to launch. These are returned with the heartbeat response.

Apart from reacting to heartbeats through {\tt assignTasks}, schedulers can also be notified when jobs have been submitted to the cluster, killed, or removed by adding listeners to the {\tt TaskTrackerManager}. The Fair Scheduler sets up these listeners in its {\tt start} method. An important role of the listeners is to initialize jobs that are submitted -- until a job is initialized, it cannot launch tasks. The Fair Scheduler currently initializes all jobs right away, but it may also be desirable to hold off initializing jobs if too many are submitted to limit memory usage on the JobTracker.

Selection of tasks \emph{within} a job is mostly done by the {\tt JobInProgress} class, and not by individual schedulers. {\tt JobInProgress} exposes two methods, {\tt obtainNewMapTask} and {\tt obtainNewReduceTask}, to launch a task of either type. Both methods may either return a {\tt Task} object or {\tt null} if the job does not wish to launch a task. Whether a job wishes to launch a task may change back and forth during its lifetime. Even after all tasks in the job have been started, the job may wish to run another task for speculative execution. In addition, if the node containing a map task failed, the job will wish to re-run it to rebuild its output for use in the reduce tasks. Schedulers may therefore need to poll multiple jobs until they find one with a task to run.

Finally, for map tasks, an important scheduling criterion is data locality: running the task on a node or rack that contains its input data. Normally, {\tt JobInProgress.obtainNewMapTask} returns the ``closest" map task to a given node. However, to give schedulers slightly more control over data locality, there is also a version of {\tt obtainNewMapTask} that allow the scheduler to cap the level of non-locality allowed for the task (e.g.~request a task only on the same node, or {\tt null} if none is available). The Fair Scheduler uses this method with an algorithm called delay scheduling (Section \ref{sec:delay-scheduling}) to optimize data locality.

\subsection{Fair Scheduler Basics}

At a high level, the Fair Scheduler uses hierarchical scheduling to assign tasks. First it selects a pool to assign a task to according to the fair sharing algorithm in Section \ref{sec:fair-sharing-alg}. Then it asks the pool obtain a task. The pool chooses among its jobs according to its internal scheduling order (FIFO or fair sharing).

In fact, because jobs might not have tasks to launch ({\tt obtainNew(Map|Reduce)Task} can return null), the scheduler actually establishes an ordering on jobs and asks them for tasks in turn. Within a pool, jobs are sorted either by priority and start time (for FIFO) or by distance below fair share. If the first job in the ordering does not have a task to launch, the pool will ask the second, third, etc jobs. Pools themselves are sorted by distance below min share and fair share, so if the first pool does not have any jobs that can launch tasks, the second pool is asked, etc. This makes it straightforward to implement features like delay scheduling (Section \ref{sec:delay-scheduling}) that may cause jobs to ``pass" on a slot.

Apart from the assign tasks code path, the Fair Scheduler also has a periodic update thread that calls {\tt update} every few seconds. This thread is responsible for recomputing fair shares to display them on the UI (Section \ref{sec:fair-share-computation}), checking whether jobs need to be preempted (Section \ref{sec:preemption}), and checking whether the allocations file has changed to reload pool allocations (through {\tt PoolManager}).

\subsection{The {\tt Schedulable} Class}

To allow the same fair sharing algorithm to be used both between pools and within a pool, the Fair Scheduler uses an abstract class called {\tt Schedulable} to represent both pools and jobs. Its subclasses for these roles are {\tt PoolSchedulable} and {\tt JobSchedulable}. A {\tt Schedulable} is responsible for three roles:
\begin{enumerate}
  \item It can be asked to obtain a task through {\tt assignTask}. This may return {\tt null} if the {\tt Schedulable} has no tasks to launch.
  \item It can be queried for information about the pool/job to use in scheduling, such as:
  \begin{itemize}
    \item Number of running tasks.
    \item Demand (number of tasks the {\tt Schedulable} \emph{wants} to run; this is equal to number of running tasks + number of unlaunched tasks).
    \item Min share assigned through config file.
    \item Weight (for fair sharing).
    \item Priority and start time (for FIFO scheduling).
  \end{itemize}
  \item It can be assigned a fair share through {\tt setFairShare}.
\end{enumerate}

There are separate {\tt Schedulable}s for map and reduce tasks, to make it possible to use the same algorithm on both types of tasks.

\subsection{Fair Sharing Algorithm}
\label{sec:fair-sharing-alg}

A simple way to achieve fair sharing is the following: whenever a slot is available, assign it to the pool that has the fewest running tasks. This will ensure that all pool get an equal number of slots, unless a pool's demand is less than its fair share, in which case the extra slots are divided evenly among the other pools. Two features of the Fair Scheduler complicate this algorithm a little:
\begin{itemize}
  \item Pool weights mean that some pools should get more slots than others. For example, a pool with weight 2 should get 2x more slots than a pool with weight 1. This is accomplished by changing the scheduling rule to ``assign the slot to the pool whose value of $runningTasks/weight$ is smallest."
  \item Minimum shares mean that pools below their min share should get slots first. When we sort pools to choose which ones to schedule next, we place pools below their min share ahead of pools above their min share. We order the pools below their min share by how far they are below it as a percentage of the share.
\end{itemize}

This fair sharing algorithm is implemented in {\tt FairShareComparator} in the {\tt SchedulingAlgorithms} class. The comparator orders jobs by distance below min share and then by $runningTasks/weight$.

\subsection{Preemption}
\label{sec:preemption}

To determine when to preempt tasks, the Fair Schedulers maintains two values for each {\tt PoolSchedulable}: the last time when the pool was at its min share, and the last time when the pool was at half its fair share. These conditions are checked periodically by the update thread in {\tt FairScheduler.updatePreemptionVariables}, using the methods {\tt isStarvedForMinShare} and {\tt isStarvedForFairShare}. These methods also take into account the demand of the pool, so that a pool is not counted as starving if its demand is below its min/fair share but is otherwise met.

When preempting tasks, the scheduler kills the most recently launched tasks from over-scheduled pools. This minimizes the amount of computation wasted by preemption and ensures that all jobs can eventually finish (it is as if the preempted jobs just never got their last few slots). The tasks are chosen and preempted in {\tt preemptTasks}.

Note that for min share preemption, it is clear when a pool is below its min share because the min share is given as a number of slots, but for fair share preemption, we must be able to compute a pool's fair share to determine when it is being starved. This computation is trickier than dividing the number of slots by the number of pools due to weights, min shares and demands. Section \ref{sec:fair-share-computation} explains how fair shares are computed.

\subsection{Fair Share Computation}
\label{sec:fair-share-computation}

The scheduling algorithm in Section \ref{sec:fair-sharing-alg} achieves fair shares without actually needing to compute pools' numerical shares beforehand. However, for preemption and for displaying shares in the Web UI, we want to know what a pool's fair share is even if the pool is not currently at its share. That is, we want to know how many slots the pool \emph{would} get if we started with all slots being empty and ran the algorithm in Section \ref{sec:fair-sharing-alg} until we filled them.
One way to compute these shares would be to simulate starting out with empty slots and calling {\tt assignTasks} repeatedly until they filled, but this is expensive, because each scheduling decision takes $O(numJobs)$ time and we need to make $O(numSlots)$ decisions.

To compute fair shares efficiently, the Fair Scheduler includes an algorithm based on binary search in {\tt SchedulingAlgorithms.computeFairShares}. This algorithm is based on the following observation. If all slots had been assigned according to weighted fair sharing respecting pools' demands and min shares, then there would exist a ratio $r$ such that:
\begin{enumerate}
  \item Pools whose demand $d_i$ is less than $r w_i$ (where $w_i$ is the weight of the pool) are assigned $d_i$ slots.
  \item Pools whose min share $m_i$ is more than $r w_i$ are assigned $\min(m_i, d_i)$ slots.
  \item All other pools are assigned $r w_i$ slots.
  \item The pools' shares sum up to the total number of slots $t$.
\end{enumerate}

The Fair Scheduler uses binary search to compute the correct $r$. We define a function $f(r)$ as the number of slots that would be used for a given $r$ if conditions 1-3 above were met, and then find a value of $r$ that makes $f(r)=t$. More precisely, $f(r)$ is defined as:
$$f(r) = \sum_i{\min(d_i, \max(r w_i, m_i)).}$$

Note that $f(r)$ is increasing in $r$ because every term of the sum is increasing, so the equation $f(r) = t$ can be solved by binary search. We choose 0 as a lower bound of our binary search because with $r=0$, only min shares are assigned. (An earlier check in {\tt computeFairShares} checks whether the min shares add up to more than the total number of slots, and if so, computes fair shares by scaling down the min shares proportionally and returns.) To compute an upper bound for the binary search, we try $r=1,2,4,8,\dots$ until we find a value large enough that either more than $t$ slots are used or all pools' demands are met (in case the demands added up to less than $t$).

The steps of the algorithm are explained in detail in {\tt SchedulingAlgorithms.java}.

This algorithm runs in time $O(NP)$, where $N$ is the number of jobs/pools and $P$ is the desired number of bits of precision in the computed values (number of iterations of binary search), which we've set to 25. It thus scales linearly in the number of jobs and pools.

\subsection{Running Job Limits}

Running job limits are implemented by marking jobs as not runnable if there are too many jobs submitted by the same user or pool. This is done in {\tt FairScheduler.updateRunnability}. A job that is not runnable declares its demand as 0 and always returns {\tt null} from {\tt assignTasks}.

\subsection{Delay Scheduling}
\label{sec:delay-scheduling}

In Hadoop, running map tasks on the nodes or racks that contain their input data is critical for performance, because it avoids shipping the data over the network. However, always assigning slots to the first job in order of pool shares and in-pool ordering (the ``head-of-line job") can sometimes lead to poor locality:
\begin{itemize}
  \item If the head-of-line job is small, the chance of it having data on the node that a heartbeat was received from is small. Therefore, locality would be poor in a small-job workload if we always assigned slots to the head-of-line job.
  \item When fair sharing is used, there is a strong bias for a job to be reassigned into a slot that it just finished a task in, because when it finishes the task, the job falls below its fair share. This can mean that jobs have a difficult time running in slots that other jobs have taken and thus achieve poor locality.
\end{itemize}

To deal with both of these situations, the Fair Scheduler can sacrifice fairness temporarily to improve locality through an algorithm called delay scheduling. If the head-of-line job cannot launch a local task on the TaskTracker that sent a heartbeat, then it is skipped, and other running jobs are looked at in order of pool shares and in-pool scheduling rules to find a job with a local task. However, if the head-of-line job has been skipped for a sufficiently long time, it is allowed to launch rack-local tasks. Then, if it is skipped for a longer time, it is also allowed to launch off-rack tasks. These skip times are called locality delays. Delays of a few seconds are sufficient to drastically increase locality.

The Fair Scheduler allows locality delays to be set through {\tt mapred-site.xml} or to be turned off by setting them to zero. However, by default, it computes the delay automatically based on the heartbeat interval of the cluster. The delay is set to 1.5x the heartbeat interval.

When a job that has been allowed to launch non-local tasks ends up launching a local task again, its ``locality level" resets and it must wait again before launching non-local tasks. This is done so that a job that gets ``unlucky" early in its lifetime does not continue to launch non-local tasks throughout its life.

Delay scheduling is implemented by keeping track of two variables on each job: the locality level of the last map it launched (0 for node-local, 1 for rack-local and 2 for off-rack) and the time it has spent being skipped for a task. These are kept in a {\tt JobInfo} structure associated with each job in {\tt FairScheduler.java}. Whenever a job is asked for tasks, it checks the locality level it is allowed to launch them at through {\tt FairScheduler.getAllowedLocalityLevel}. If it does not launch a task, it is marked as ``visited" on that heartbeat by appending itself to a {\tt visited} job list that is passed around between calls to {\tt assignTasks} on the same heartbeat. Jobs that are visited on a heartbeat but do not launch any tasks during it are considered as skipped for the time interval between this heartbeat and the next. Code at the beginning of {\tt FairScheduler.assignTasks} increments the wait time of each skipped job by the time elapsed since the last heartbeat. Once a job has been skipped for more than the locality delay, {\tt getAllowedLocalityLevel} starts returning higher locality so that it is allowed to launch less-local tasks. Whenever the job launches a task, its wait time is reset, but we remember the locality level of the launched task so that the job is allowed to launch more tasks at this level without further waiting.

\subsection{Locking Order}

Fair Scheduler data structures can be touched by several threads. Most commonly, the JobTracker invokes {\tt assignTasks}. This happens inside a block of code where the JobTracker has locked itself already. Therefore, to prevent deadlocks, we always ensure that \emph{if both the FairScheduler and the JobTracker must be locked, the JobTracker is locked first}. Other threads that can lock the FairScheduler include the update thread and the web UI.

\subsection{Unit Tests}

The Fair Scheduler contains extensive unit tests using mock {\tt TaskTrackerManager}, {\tt JobInProgress}, {\tt TaskInProgress}, and {\tt Schedulable} objects. Scheduler tests are in {\tt TestFairScheduler.java}. The {\tt computeFairShares} algorithm is tested separately in {\tt TestComputeFairShares.java}. All tests use accelerated time via a fake {\tt Clock} class.

\pagebreak
\section{Code Guide}

The following table lists some key source files in the Fair Scheduler:

\begin{center}
\begin{tabular}{|l|p{0.7\columnwidth}|}
  \hline
  {\bf File} & {\bf Contents} 
  \\ \hline
  {\tt FairScheduler.java} & Scheduler entry point. Also contains update thread, and logic for preemption, delay scheduling, and running job limits.
  \\ \hline
  {\tt Schedulable.java} & Definition of the {\tt Schedulable} class. Extended by {\tt JobSchedulable} and {\tt PoolSchedulable}.
  \\ \hline
  {\tt SchedulingAlgorithms.java} & Contains FIFO and fair sharing comparators, as well as the {\tt computeFairShares} algorithm in Section \ref{sec:fair-share-computation}.
  \\ \hline
  {\tt PoolManager.java} & Reads pool properties from the allocation file and maintains a collection of {\tt Pool} objects. Pools are created on demand.
  \\ \hline
  {\tt Pool.java} & Represents a pool and stores its map and reduce {\tt Schedulables}.
  \\ \hline
  {\tt FairSchedulerServlet.java} & Implements the scheduler's web UI.
  \\ \hline
  {\tt FairSchedulerEventLog.java} & An easy-to-parse event log for debugging. Must be enabled through {\tt mapred.fairscheduler.eventlog.enabled}.
  If enabled, logs are placed in {\tt \$HADOOP\_LOG\_DIR/fairscheduler}.
  \\ \hline
  {\tt TaskSelector.java} & A pluggable class responsible for picking tasks within a job. Currently, {\tt DefaultTaskSelector} delegates to {\tt JobInProgress}, but this would be a useful place to experiment with new algorithms for speculative execution and locality.
  \\ \hline
  {\tt LoadManager.java} & A pluggable class responsible for determining when to launch more tasks on a TaskTracker. Currently, {\tt CapBasedLoadManager} uses slot counts, but this would be a useful place to experiment with scheduling based on machine load.
  \\ \hline
  {\tt WeightAdjuster.java} & A pluggable class responsible for setting job weights. An example, {\tt NewJobWeightBooster}, is provided, which increases weight temporarily for new jobs.
  \\ \hline
\end{tabular}
\end{center}

\end{document}